package com.qupeng.demo.kafka.kafkaspringbootstreams.common.kafka.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

//@Configuration
public class KafkaStreamsCleanupConfig implements DisposableBean {
 
    private final KafkaStreams kafkaStreams;
 
    public KafkaStreamsCleanupConfig(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }
 
    @Override
    public void destroy() throws Exception {
        if (kafkaStreams != null) {
            kafkaStreams.close(Duration.ofMillis(5000)); // 设置一个关闭超时时间
            kafkaStreams.cleanUp();
        }
    }
}